{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Initiate H2OContext on top of Spark\n",
    "from pysparkling import *\n",
    "hc = H2OContext.getOrCreate(spark)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This is just helper function returning relative path to data files within sparkling-water project directories\n",
    "def _locate(example_name): \n",
    "    return \"../../../examples/smalldata/chicago/\" + example_name \n",
    "\n",
    "# Define file names\n",
    "chicagoAllWeather = \"chicagoAllWeather.csv\"\n",
    "chicagoCensus = \"chicagoCensus.csv\"\n",
    "chicagoCrimes10k = \"chicagoCrimes10k.csv.zip\"\n",
    "\n",
    "import h2o\n",
    "# h2o.import_file expects cluster-relative path\n",
    "f_weather = h2o.upload_file(_locate(chicagoAllWeather))\n",
    "f_census = h2o.upload_file(_locate(chicagoCensus))\n",
    "f_crimes = h2o.upload_file(_locate(chicagoCrimes10k))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "f_weather.show()\n",
    "f_census.show()\n",
    "f_crimes.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Set time zone to UTC for date manipulation\n",
    "h2o.cluster().timezone = \"Etc/UTC\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Transform weather table\n",
    "## Remove 1st column (date)\n",
    "f_weather = f_weather[1:]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Transform census table\n",
    "## Remove all spaces from column names (causing problems in Spark SQL)\n",
    "col_names = map(lambda s: s.strip().replace(' ', '_').replace('+','_'), f_census.col_names)\n",
    "\n",
    "## Update column names in the table\n",
    "#f_weather.names = col_names\n",
    "f_census.names = col_names"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Transform crimes table\n",
    "\n",
    "## Drop useless columns\n",
    "f_crimes = f_crimes[2:]\n",
    "\n",
    "## Replace ' ' by '_' in column names\n",
    "col_names = map(lambda s: s.replace(' ', '_'), f_crimes.col_names)\n",
    "f_crimes.names = col_names\n",
    "\n",
    "## Refine date column\n",
    "def refine_date_col(data, col):\n",
    "    data[\"Day\"]       = data[col].day()\n",
    "    data[\"Month\"]     = data[col].month()\n",
    "    data[\"Year\"]      = data[col].year()\n",
    "    data[\"WeekNum\"]   = data[col].week()\n",
    "    data[\"WeekDay\"]   = data[col].dayOfWeek()\n",
    "    data[\"HourOfDay\"] = data[col].hour()\n",
    "        \n",
    "    # Create weekend and season cols\n",
    "    data[\"Weekend\"] = ((data[\"WeekDay\"] == \"Sun\") | (data[\"WeekDay\"] == \"Sat\"))\n",
    "    data[\"Season\"] = data[\"Month\"].cut([0, 2, 5, 7, 10, 12], [\"Winter\", \"Spring\", \"Summer\", \"Autumn\", \"Winter\"])\n",
    "    \n",
    "refine_date_col(f_crimes, \"Date\")\n",
    "f_crimes = f_crimes.drop(\"Date\")\n",
    "f_crimes.describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Expose H2O frames as Spark DataFrame\n",
    "df_weather = hc.as_spark_frame(f_weather)\n",
    "df_census = hc.as_spark_frame(f_census)\n",
    "df_crimes = hc.as_spark_frame(f_crimes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_weather.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use Spark SQL to join datasets\n",
    "\n",
    "# Register DataFrames as tables\n",
    "df_weather.createOrReplaceTempView(\"chicagoWeather\")\n",
    "df_census.createOrReplaceTempView(\"chicagoCensus\")\n",
    "df_crimes.createOrReplaceTempView(\"chicagoCrime\")\n",
    "\n",
    "\n",
    "crimeWithWeather = spark.sql(\"\"\"SELECT\n",
    "a.Year, a.Month, a.Day, a.WeekNum, a.HourOfDay, a.Weekend, a.Season, a.WeekDay,\n",
    "a.IUCR, a.Primary_Type, a.Location_Description, a.Community_Area, a.District,\n",
    "a.Arrest, a.Domestic, a.Beat, a.Ward, a.FBI_Code,\n",
    "b.minTemp, b.maxTemp, b.meanTemp,\n",
    "c.PERCENT_AGED_UNDER_18_OR_OVER_64, c.PER_CAPITA_INCOME, c.HARDSHIP_INDEX,\n",
    "c.PERCENT_OF_HOUSING_CROWDED, c.PERCENT_HOUSEHOLDS_BELOW_POVERTY,\n",
    "c.PERCENT_AGED_16__UNEMPLOYED, c.PERCENT_AGED_25__WITHOUT_HIGH_SCHOOL_DIPLOMA\n",
    "FROM chicagoCrime a\n",
    "JOIN chicagoWeather b\n",
    "ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day\n",
    "JOIN chicagoCensus c\n",
    "ON a.Community_Area = c.Community_Area_Number\"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "crimeWithWeather.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Publish Spark DataFrame as H2OFrame with given name\n",
    "crimeWithWeatherHF = hc.as_h2o_frame(crimeWithWeather, \"crimeWithWeatherTable\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Transform selected String columns to categoricals\n",
    "cat_cols = [\"Arrest\", \"Season\", \"WeekDay\", \"Primary_Type\", \"Location_Description\", \"Domestic\"]\n",
    "\n",
    "for col in cat_cols :\n",
    "    crimeWithWeatherHF[col] = crimeWithWeatherHF[col].asfactor()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Split frame into two - we use one as the training frame and the second one as the validation frame\n",
    "splits = crimeWithWeatherHF.split_frame(ratios=[0.8])\n",
    "train = splits[0]\n",
    "test = splits[1]\n",
    "\n",
    "# Prepare column names\n",
    "predictor_columns = train.drop(\"Arrest\").col_names\n",
    "response_column = \"Arrest\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create and train GBM model\n",
    "from h2o.estimators.gbm import H2OGradientBoostingEstimator\n",
    "\n",
    "# Prepare model based on the given set of parameters\n",
    "gbm_model = H2OGradientBoostingEstimator(ntrees       = 50,\n",
    "                                         max_depth    = 3,\n",
    "                                         learn_rate   = 0.1,\n",
    "                                         distribution = \"bernoulli\"\n",
    "                                        )\n",
    "\n",
    "# Train the model\n",
    "gbm_model.train(x            = predictor_columns,\n",
    "            y                = response_column,\n",
    "            training_frame   = train,\n",
    "            validation_frame = test\n",
    "         )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Show GBM model performance\n",
    "gbm_model.model_performance(test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create and train deeplearning model\n",
    "from h2o.estimators.deeplearning import H2ODeepLearningEstimator\n",
    "\n",
    "# Prepare model based on the given set of parameters\n",
    "dl_model = H2ODeepLearningEstimator()\n",
    "\n",
    "# Train the model\n",
    "dl_model.train(x                = predictor_columns,\n",
    "               y                = response_column,\n",
    "               training_frame   = train,\n",
    "               validation_frame = test\n",
    "              )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Show deeplearning model performance\n",
    "dl_model.model_performance(test)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create crime class which is used as a data holder on which prediction is done\n",
    "from datetime import datetime\n",
    "from pytz import timezone\n",
    "from pyspark.sql import Row\n",
    "\n",
    "def get_season(dt):\n",
    "    if (dt >= 3 and dt <= 5):\n",
    "        return \"Spring\"\n",
    "    elif (dt >= 6 and dt <= 8):\n",
    "        return \"Summer\"\n",
    "    elif (dt >= 9 and dt <= 10):\n",
    "        return \"Autumn\"\n",
    "    else:       \n",
    "        return \"Winter\"\n",
    "    \n",
    "def crime(date,\n",
    "          iucr,\n",
    "          primaryType,\n",
    "          locationDescr,\n",
    "          domestic,\n",
    "          beat,\n",
    "          district,\n",
    "          ward,\n",
    "          communityArea,\n",
    "          fbiCode,\n",
    "          minTemp = 77777,\n",
    "          maxTemp = 77777,\n",
    "          meanTemp = 77777,\n",
    "          datePattern = \"%m/%d/%Y %I:%M:%S %p\",\n",
    "          dateTimeZone = \"Etc/UTC\"):\n",
    "\n",
    "    dt = datetime.strptime(date, datePattern)\n",
    "    dt.replace(tzinfo=timezone(dateTimeZone))\n",
    "\n",
    "    crime = Row(\n",
    "        Year = dt.year,\n",
    "        Month = dt.month,\n",
    "        Day = dt.day,\n",
    "        WeekNum = dt.isocalendar()[1],\n",
    "        HourOfDay = dt.hour,\n",
    "        Weekend = 1 if dt.weekday() == 5 or dt.weekday() == 6 else 0,\n",
    "        Season = get_season(dt.month),\n",
    "        WeekDay = dt.strftime('%a'),  #gets the day of week in short format - Mon, Tue ...\n",
    "        IUCR = iucr,\n",
    "        Primary_Type = primaryType,\n",
    "        Location_Description = locationDescr,\n",
    "        Domestic = \"true\" if domestic else \"false\",\n",
    "        Beat = beat,\n",
    "        District = district,\n",
    "        Ward = ward,\n",
    "        Community_Area = communityArea,\n",
    "        FBI_Code = fbiCode,\n",
    "        minTemp = minTemp,\n",
    "        maxTemp = maxTemp,\n",
    "        meanTemp = meanTemp\n",
    "    )\n",
    "    return crime"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create crime examples\n",
    "crime_examples = [\n",
    "  \n",
    "    crime(date=\"02/08/2015 11:43:58 PM\", iucr=1811, primaryType=\"NARCOTICS\", locationDescr=\"STREET\",\n",
    "          domestic=False, beat=422, district=4, ward=7, communityArea=46, fbiCode=18, \n",
    "          minTemp = 19, meanTemp=27, maxTemp=32),\n",
    "  \n",
    "    crime(date=\"02/08/2015 11:00:39 PM\", iucr=1150, primaryType=\"DECEPTIVE PRACTICE\", locationDescr=\"RESIDENCE\",\n",
    "          domestic=False, beat=923, district=9, ward=14, communityArea=63, fbiCode=11, \n",
    "          minTemp = 19, meanTemp=27, maxTemp=32)\n",
    "]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# For given crime and model return probability of crime.\n",
    "def score_event(crime, model, censusTable):\n",
    "    rdd = sc.parallelize([crime])\n",
    "    crime_frame = sqlContext.createDataFrame(rdd)\n",
    "    # Join table with census data\n",
    "    df_row = censusTable.join(crime_frame).where(\"Community_Area = Community_Area_Number\")  \n",
    "    row = hc.as_h2o_frame(df_row)\n",
    "    row[\"Season\"] = row[\"Season\"].asfactor()\n",
    "    row[\"WeekDay\"] = row[\"WeekDay\"].asfactor()\n",
    "    row[\"Primary_Type\"] = row[\"Primary_Type\"].asfactor()\n",
    "    row[\"Location_Description\"] = row[\"Location_Description\"].asfactor()\n",
    "    row[\"Domestic\"] = row[\"Domestic\"].asfactor()\n",
    "\n",
    "    predictTable = model.predict(row)\n",
    "    probOfArrest = predictTable[\"true\"][0,0]\n",
    "    return probOfArrest\n",
    "\n",
    "for i in crime_examples:\n",
    "    arrestProbGBM = 100*score_event(i, gbm_model, df_census)\n",
    "    arrestProbDLM = 100*score_event(i, dl_model, df_census)\n",
    "\n",
    "    print(\"\"\"\n",
    "       |Crime: \"\"\"+ str(i)+\"\"\"\n",
    "       |  Probability of arrest best on DeepLearning: \"\"\"+str(arrestProbDLM)+\"\"\"\n",
    "       |  Probability of arrest best on GBM: \"\"\"+str(arrestProbGBM)+\"\"\"\n",
    "        \"\"\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
